package com.galeno.test

import com.galeno.utils.SparkUtil
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/8/3110:28
 */
object Test1 {
  def main(args: Array[String]): Unit = {
    val sc = SparkUtil.getSc
    val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
    //调用map方法没处理一天数据就调用一次传入到map方法中的函数
    val res = nums.map(x => {
      println("*********")
      val index = TaskContext.getPartitionId()
      (index, x * 100)
    })
    //mappartitions方法
    val res2 = nums.mapPartitions(it => {
      var index = TaskContext.getPartitionId()
      val tuples: Iterator[(Int, Int)] = it.map(x => {
        (index, x * 100)
      })
      tuples
    })

    val res4 = nums.mapPartitionsWithIndex((index, it) => {
      it.map(e => {
        s"partitionIndex: $index,element:$e"
      })
    })

    val words: RDD[String] = sc.makeRDD(Array("hadoop", "spark", "flink", "java"), 6)
    words.saveAsTextFile("data/out5")

    val res5 = words.mapPartitionsWithIndex((i, it) => {
      it.map(e => {
        s"分区:+$i 元素:$e"
      })
    })
    val arr = res5.collect()
    println(arr.toBuffer)





    res2.saveAsTextFile("data/out2")
    res.saveAsTextFile("data/out1")
    res4.saveAsTextFile("data/out4")
    Thread.sleep(100000)
  }

}
